airflow dag_run

工作流概述

这是一个包含12个节点的复杂工作流,主要用于自动化处理各种任务。

工作流源代码

下载
{
  "id": "Y5URlIlbX4HDzWKA",
  "meta": {
    "instanceId": "6ae0aa8b6c9099f1f8ed1265281802eab47aaf5b2845f317791e4ac7ee5b7279",
    "templateCredsSetupCompleted": true
  },
  "name": "airflow dag_run",
  "tags": [
    {
      "id": "YSelDQ3zfWB0LeVn",
      "name": "airflow",
      "createdAt": "2025-02-25T04:17:21.943Z",
      "updatedAt": "2025-02-25T04:17:21.943Z"
    }
  ],
  "nodes": [
    {
      "id": "0d4457ef-7a88-4755-8bd2-b0e8148f86f4",
      "name": "Airflow: dag_run",
      "type": "n8n-nodes-base.httpRequest",
      "position": [
        380,
        -40
      ],
      "parameters": {
        "url": "={{ $('airflow-api').item.json.prefix }}/api/v1/dags/{{ $('in data').item.json.dag_id }}/dagRuns",
        "method": "POST",
        "options": {},
        "jsonBody": "={
  \"conf\": {{ $('in data').item.json.conf }}
}",
        "sendBody": true,
        "specifyBody": "json",
        "authentication": "genericCredentialType",
        "genericAuthType": "httpBasicAuth"
      },
      "credentials": {
        "httpBasicAuth": {
          "id": "vTR4WWA7czRn2ULn",
          "name": "Airflow"
        }
      },
      "typeVersion": 4.2
    },
    {
      "id": "acf477a0-aad5-402a-a5a0-543f3e277333",
      "name": "Airflow: dag_run - state",
      "type": "n8n-nodes-base.httpRequest",
      "position": [
        840,
        60
      ],
      "parameters": {
        "url": "={{ $('airflow-api').item.json.prefix }}/api/v1/dags/{{ $('in data').item.json.dag_id }}/dagRuns/{{ $('Airflow: dag_run').item.json.dag_run_id }}",
        "options": {},
        "authentication": "genericCredentialType",
        "genericAuthType": "httpBasicAuth"
      },
      "credentials": {
        "httpBasicAuth": {
          "id": "vTR4WWA7czRn2ULn",
          "name": "Airflow"
        }
      },
      "typeVersion": 4.2
    },
    {
      "id": "26982a6f-6281-4140-a05c-ea6f3f2c0cb2",
      "name": "count",
      "type": "n8n-nodes-base.code",
      "position": [
        1180,
        40
      ],
      "parameters": {
        "jsCode": "try {
  $('count').first().json.count += 1
  return {count:$('count').first().json.count};
}
catch (error) {
  return {count:1};
}"
      },
      "typeVersion": 2
    },
    {
      "id": "613718f6-ba7e-415c-8e07-0123224e1ac6",
      "name": "dag run fail",
      "type": "n8n-nodes-base.stopAndError",
      "position": [
        1180,
        400
      ],
      "parameters": {
        "errorMessage": "dag run fail"
      },
      "typeVersion": 1
    },
    {
      "id": "66ba0e85-4608-418b-b27b-8cbc50f78319",
      "name": "if state == queued",
      "type": "n8n-nodes-base.if",
      "position": [
        520,
        60
      ],
      "parameters": {
        "options": {},
        "conditions": {
          "options": {
            "version": 2,
            "leftValue": "",
            "caseSensitive": true,
            "typeValidation": "strict"
          },
          "combinator": "and",
          "conditions": [
            {
              "id": "0ae67986-09c0-443d-9262-075bfe94ca2e",
              "operator": {
                "name": "filter.operator.equals",
                "type": "string",
                "operation": "equals"
              },
              "leftValue": "={{ $json.state }}",
              "rightValue": "queued"
            }
          ]
        }
      },
      "typeVersion": 2.2
    },
    {
      "id": "92176e9a-0ea7-48b0-9ca0-ea4ea8442cee",
      "name": "dag run wait too long",
      "type": "n8n-nodes-base.stopAndError",
      "position": [
        1500,
        40
      ],
      "parameters": {
        "errorMessage": "dag run wait too long"
      },
      "typeVersion": 1
    },
    {
      "id": "6a05471f-232e-44d6-9dbb-583400537507",
      "name": "Airflow: dag_run - get result",
      "type": "n8n-nodes-base.httpRequest",
      "position": [
        1180,
        -140
      ],
      "parameters": {
        "url": "={{ $('airflow-api').item.json.prefix }}/api/v1/dags/{{ $('in data').item.json.dag_id }}/dagRuns/{{ $('Airflow: dag_run').item.json.dag_run_id }}/taskInstances/{{ $('in data').item.json.task_id }}/xcomEntries/return_value",
        "options": {},
        "authentication": "genericCredentialType",
        "genericAuthType": "httpBasicAuth"
      },
      "credentials": {
        "httpBasicAuth": {
          "id": "vTR4WWA7czRn2ULn",
          "name": "Airflow"
        }
      },
      "typeVersion": 4.2
    },
    {
      "id": "fb2211d5-cef2-4be2-b281-de315aa07093",
      "name": "Switch: state",
      "type": "n8n-nodes-base.switch",
      "position": [
        980,
        -40
      ],
      "parameters": {
        "rules": {
          "values": [
            {
              "outputKey": "=success",
              "conditions": {
                "options": {
                  "version": 2,
                  "leftValue": "",
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "id": "4d4ecf8a-c02b-4722-9528-1919cdf9b83e",
                    "operator": {
                      "name": "filter.operator.equals",
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.state }}",
                    "rightValue": "success"
                  }
                ]
              },
              "renameOutput": true
            },
            {
              "outputKey": "queued",
              "conditions": {
                "options": {
                  "version": 2,
                  "leftValue": "",
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "operator": {
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.state }}",
                    "rightValue": "queued"
                  }
                ]
              },
              "renameOutput": true
            },
            {
              "outputKey": "running",
              "conditions": {
                "options": {
                  "version": 2,
                  "leftValue": "",
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "id": "fa5d8524-334a-4ab1-b543-bba75cafd0ec",
                    "operator": {
                      "name": "filter.operator.equals",
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.state }}",
                    "rightValue": "running"
                  }
                ]
              },
              "renameOutput": true
            },
            {
              "outputKey": "failed",
              "conditions": {
                "options": {
                  "version": 2,
                  "leftValue": "",
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "id": "dd853677-c51c-4c06-8680-3c9d1829d6bd",
                    "operator": {
                      "name": "filter.operator.equals",
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.state }}",
                    "rightValue": "failed"
                  }
                ]
              },
              "renameOutput": true
            }
          ]
        },
        "options": {
          "fallbackOutput": 3
        }
      },
      "typeVersion": 3.2
    },
    {
      "id": "5941496a-a64d-432c-9103-e7bcae4b8d67",
      "name": "in data",
      "type": "n8n-nodes-base.executeWorkflowTrigger",
      "position": [
        100,
        -40
      ],
      "parameters": {
        "workflowInputs": {
          "values": [
            {
              "name": "dag_id"
            },
            {
              "name": "task_id"
            },
            {
              "name": "conf"
            },
            {
              "name": "wait",
              "type": "number"
            },
            {
              "name": "wait_time",
              "type": "number"
            }
          ]
        }
      },
      "typeVersion": 1.1
    },
    {
      "id": "e77fed4a-b61a-4126-8be3-43ef8a832cfe",
      "name": "Wait",
      "type": "n8n-nodes-base.wait",
      "position": [
        700,
        -40
      ],
      "webhookId": "3d164954-2926-4174-afc1-261dfdfa9e46",
      "parameters": {
        "amount": "={{ $('in data').item.json.hasOwnProperty('wait') ? $('in data').item.json.wait : 10 }}"
      },
      "typeVersion": 1.1
    },
    {
      "id": "8ffae842-4400-4667-85bb-50644ef10fc0",
      "name": "If count > wait_time",
      "type": "n8n-nodes-base.if",
      "position": [
        1320,
        140
      ],
      "parameters": {
        "options": {},
        "conditions": {
          "options": {
            "version": 2,
            "leftValue": "",
            "caseSensitive": true,
            "typeValidation": "strict"
          },
          "combinator": "and",
          "conditions": [
            {
              "id": "1829d538-5633-4f5c-ad1b-285b084b35ee",
              "operator": {
                "type": "number",
                "operation": "gt"
              },
              "leftValue": "={{ $json.count }}",
              "rightValue": "={{ $('in data').item.json.hasOwnProperty('wait_time') ? $('in data').item.json.wait_time : 12 }}"
            }
          ]
        }
      },
      "typeVersion": 2.2
    },
    {
      "id": "c49d4a1b-6f25-471b-9c21-d3defb709dda",
      "name": "airflow-api",
      "type": "n8n-nodes-base.set",
      "position": [
        240,
        60
      ],
      "parameters": {
        "options": {},
        "assignments": {
          "assignments": [
            {
              "id": "40a5ab5b-dee1-44c4-910a-d6b85af75aed",
              "name": "prefix",
              "type": "string",
              "value": "https://airflow.example.com"
            }
          ]
        }
      },
      "typeVersion": 3.4
    }
  ],
  "active": false,
  "pinData": {
    "in data": [
      {
        "json": {
          "conf": "{
  \"image\": \"nginx\",
  \"tag\": \"latest\",
  \"tag_requested\": 1000
}",
          "wait": 10,
          "dag_id": "image_tag_related",
          "task_id": "image_tag_related",
          "wait_time": 18
        }
      }
    ]
  },
  "settings": {
    "executionOrder": "v1"
  },
  "versionId": "57fdbcfc-7950-4aff-9ac7-3c2a99a2c8c8",
  "connections": {
    "Wait": {
      "main": [
        [
          {
            "node": "Airflow: dag_run - state",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "count": {
      "main": [
        [
          {
            "node": "If count > wait_time",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "in data": {
      "main": [
        [
          {
            "node": "airflow-api",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "airflow-api": {
      "main": [
        [
          {
            "node": "Airflow: dag_run",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Switch: state": {
      "main": [
        [
          {
            "node": "Airflow: dag_run - get result",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "count",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "count",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "dag run fail",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Airflow: dag_run": {
      "main": [
        [
          {
            "node": "if state == queued",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "if state == queued": {
      "main": [
        [
          {
            "node": "Wait",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "dag run fail",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "If count > wait_time": {
      "main": [
        [
          {
            "node": "dag run wait too long",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "Wait",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Airflow: dag_run - state": {
      "main": [
        [
          {
            "node": "Switch: state",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Airflow: dag_run - get result": {
      "main": [
        []
      ]
    }
  }
}

功能特点

  • 自动检测新邮件
  • AI智能内容分析
  • 自定义分类规则
  • 批量处理能力
  • 详细的处理日志

技术分析

节点类型及作用

  • Httprequest
  • Code
  • Stopanderror
  • If
  • Switch

复杂度评估

配置难度:
★★★★☆
维护难度:
★★☆☆☆
扩展性:
★★★★☆

实施指南

前置条件

  • 有效的Gmail账户
  • n8n平台访问权限
  • Google API凭证
  • AI分类服务订阅

配置步骤

  1. 在n8n中导入工作流JSON文件
  2. 配置Gmail节点的认证信息
  3. 设置AI分类器的API密钥
  4. 自定义分类规则和标签映射
  5. 测试工作流执行
  6. 配置定时触发器(可选)

关键参数

参数名称 默认值 说明
maxEmails 50 单次处理的最大邮件数量
confidenceThreshold 0.8 分类置信度阈值
autoLabel true 是否自动添加标签

最佳实践

优化建议

  • 定期更新AI分类模型以提高准确性
  • 根据邮件量调整处理批次大小
  • 设置合理的分类置信度阈值
  • 定期清理过期的分类规则

安全注意事项

  • 妥善保管API密钥和认证信息
  • 限制工作流的访问权限
  • 定期审查处理日志
  • 启用双因素认证保护Gmail账户

性能优化

  • 使用增量处理减少重复工作
  • 缓存频繁访问的数据
  • 并行处理多个邮件分类任务
  • 监控系统资源使用情况

故障排除

常见问题

邮件未被正确分类

检查AI分类器的置信度阈值设置,适当降低阈值或更新训练数据。

Gmail认证失败

确认Google API凭证有效且具有正确的权限范围,重新进行OAuth授权。

调试技巧

  • 启用详细日志记录查看每个步骤的执行情况
  • 使用测试邮件验证分类逻辑
  • 检查网络连接和API服务状态
  • 逐步执行工作流定位问题节点

错误处理

工作流包含以下错误处理机制:

  • 网络超时自动重试(最多3次)
  • API错误记录和告警
  • 处理失败邮件的隔离机制
  • 异常情况下的回滚操作